今天我們要做一個服務監控工具,定期檢查多個服務的健康狀況,並且在服務異常時要有警報工作
這個在我做網頁專案時非常實用,當然網上有很多類似的工具甚至 Grafana/prometheus 也有相關的工具
但是一樣,我們是以學習為目的做開發,那我就在這裡展示一下如何用 rust 實現這項功能
可能內容看起來像我們 Day13
但我們今天著重更近一步的狀況,像是 Email,slack,webhook 等
雖然先前專案已經有使用過這些但還是簡單介紹一下每個 Dependencies 的功用
reqwest: HTTP 客戶端,用於 HTTP 健康檢查
tokio: 異步運行時
serde: 序列化和反序列化配置
chrono: 時間處理
axum: Web 框架,提供 Dashboard API
lettre: Email 發送
rusqlite: SQLite 資料庫,存儲歷史記錄
cargo.toml
[package]
name = "service_health_monitor"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.35", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
axum = "0.7"
tower-http = { version = "0.5", features = ["cors", "trace"] }
tracing = "0.1"
tracing-subscriber = "0.3"
lettre = "0.11"
rusqlite = { version = "0.30", features = ["bundled"] }
anyhow = "1.0"
config = "0.13"
一樣,我們先把資料結構弄出來
src/models.rs
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
/// 服務健康狀態
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum HealthStatus {
Healthy,
Unhealthy,
Unknown,
}
/// 健康檢查類型
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum CheckType {
Http {
url: String,
method: String,
expected_status: u16,
timeout_secs: u64,
},
Tcp {
host: String,
port: u16,
timeout_secs: u64,
},
Command {
command: String,
args: Vec<String>,
timeout_secs: u64,
},
}
/// 警報通知配置
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AlertConfig {
Email {
smtp_server: String,
smtp_port: u16,
username: String,
password: String,
from: String,
to: Vec<String>,
},
Slack {
webhook_url: String,
},
Webhook {
url: String,
headers: Vec<(String, String)>,
},
}
/// 服務配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceConfig {
pub name: String,
pub description: String,
pub check: CheckType,
pub check_interval_secs: u64,
pub retry_count: u32,
pub retry_delay_secs: u64,
pub alerts: Vec<AlertConfig>,
}
/// 健康檢查結果
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckResult {
pub service_name: String,
pub status: HealthStatus,
pub message: String,
pub response_time_ms: u64,
pub timestamp: DateTime<Utc>,
}
/// 服務狀態
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStatus {
pub name: String,
pub description: String,
pub current_status: HealthStatus,
pub last_check: DateTime<Utc>,
pub last_healthy: Option<DateTime<Utc>>,
pub consecutive_failures: u32,
pub uptime_percentage: f64,
}
/// 監控配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorConfig {
pub services: Vec<ServiceConfig>,
pub database_path: String,
pub dashboard_port: u16,
}
src/checker.rs
use anyhow::{Context, Result};
use chrono::Utc;
use std::process::Command;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::time::timeout;
use crate::models::{CheckType, HealthCheckResult, HealthStatus};
pub struct HealthChecker {
client: reqwest::Client,
}
impl HealthChecker {
pub fn new() -> Self {
Self {
client: reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap(),
}
}
pub async fn check(
&self,
service_name: &str,
check: &CheckType,
) -> HealthCheckResult {
let start = Instant::now();
let (status, message) = match check {
CheckType::Http {
url,
method,
expected_status,
timeout_secs,
} => self.check_http(url, method, *expected_status, *timeout_secs).await,
CheckType::Tcp {
host,
port,
timeout_secs,
} => self.check_tcp(host, *port, *timeout_secs).await,
CheckType::Command {
command,
args,
timeout_secs,
} => self.check_command(command, args, *timeout_secs).await,
};
let response_time_ms = start.elapsed().as_millis() as u64;
HealthCheckResult {
service_name: service_name.to_string(),
status,
message,
response_time_ms,
timestamp: Utc::now(),
}
}
async fn check_http(
&self,
url: &str,
method: &str,
expected_status: u16,
timeout_secs: u64,
) -> (HealthStatus, String) {
let request = match method.to_uppercase().as_str() {
"GET" => self.client.get(url),
"POST" => self.client.post(url),
"HEAD" => self.client.head(url),
_ => return (HealthStatus::Unknown, format!("Unsupported HTTP method: {}", method)),
};
match timeout(
Duration::from_secs(timeout_secs),
request.send(),
)
.await
{
Ok(Ok(response)) => {
let status_code = response.status().as_u16();
if status_code == expected_status {
(
HealthStatus::Healthy,
format!("HTTP {} OK (expected {})", status_code, expected_status),
)
} else {
(
HealthStatus::Unhealthy,
format!(
"HTTP {} (expected {})",
status_code, expected_status
),
)
}
}
Ok(Err(e)) => (
HealthStatus::Unhealthy,
format!("HTTP request failed: {}", e),
),
Err(_) => (
HealthStatus::Unhealthy,
format!("HTTP request timeout after {}s", timeout_secs),
),
}
}
async fn check_tcp(
&self,
host: &str,
port: u16,
timeout_secs: u64,
) -> (HealthStatus, String) {
let addr = format!("{}:{}", host, port);
match timeout(
Duration::from_secs(timeout_secs),
TcpStream::connect(&addr),
)
.await
{
Ok(Ok(_)) => (
HealthStatus::Healthy,
format!("TCP connection to {} successful", addr),
),
Ok(Err(e)) => (
HealthStatus::Unhealthy,
format!("TCP connection to {} failed: {}", addr, e),
),
Err(_) => (
HealthStatus::Unhealthy,
format!("TCP connection timeout after {}s", timeout_secs),
),
}
}
async fn check_command(
&self,
command: &str,
args: &[String],
timeout_secs: u64,
) -> (HealthStatus, String) {
let command_str = format!("{} {}", command, args.join(" "));
match timeout(
Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking({
let cmd = command.to_string();
let args = args.to_vec();
move || {
Command::new(&cmd)
.args(&args)
.output()
}
}),
)
.await
{
Ok(Ok(Ok(output))) => {
if output.status.success() {
(
HealthStatus::Healthy,
format!("Command '{}' executed successfully", command_str),
)
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
(
HealthStatus::Unhealthy,
format!("Command '{}' failed: {}", command_str, stderr),
)
}
}
Ok(Ok(Err(e))) => (
HealthStatus::Unhealthy,
format!("Failed to execute command '{}': {}", command_str, e),
),
Ok(Err(e)) => (
HealthStatus::Unhealthy,
format!("Command task failed: {}", e),
),
Err(_) => (
HealthStatus::Unhealthy,
format!("Command timeout after {}s", timeout_secs),
),
}
}
}
src/alerter.rs
use anyhow::Result;
use lettre::message::{header, MultiPart, SinglePart};
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use serde_json::json;
use crate::models::{AlertConfig, HealthCheckResult, HealthStatus};
pub struct Alerter {
client: reqwest::Client,
}
impl Alerter {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
}
}
pub async fn send_alert(
&self,
config: &AlertConfig,
result: &HealthCheckResult,
) -> Result<()> {
match config {
AlertConfig::Email {
smtp_server,
smtp_port,
username,
password,
from,
to,
} => {
self.send_email(
smtp_server,
*smtp_port,
username,
password,
from,
to,
result,
)
.await
}
AlertConfig::Slack { webhook_url } => {
self.send_slack(webhook_url, result).await
}
AlertConfig::Webhook { url, headers } => {
self.send_webhook(url, headers, result).await
}
}
}
async fn send_email(
&self,
smtp_server: &str,
smtp_port: u16,
username: &str,
password: &str,
from: &str,
to: &[String],
result: &HealthCheckResult,
) -> Result<()> {
let subject = format!(
"[{}] Service {} Health Alert",
if result.status == HealthStatus::Healthy {
"RECOVERED"
} else {
"CRITICAL"
},
result.service_name
);
let body = format!(
r#"
Service Health Alert
Service: {}
Status: {:?}
Message: {}
Response Time: {}ms
Timestamp: {}
This is an automated alert from the Service Health Monitor.
"#,
result.service_name,
result.status,
result.message,
result.response_time_ms,
result.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
);
let mut email_builder = Message::builder()
.from(from.parse()?)
.subject(subject);
for recipient in to {
email_builder = email_builder.to(recipient.parse()?);
}
let email = email_builder
.multipart(
MultiPart::alternative()
.singlepart(
SinglePart::builder()
.header(header::ContentType::TEXT_PLAIN)
.body(body),
)
)?;
let creds = Credentials::new(username.to_string(), password.to_string());
let mailer = SmtpTransport::relay(smtp_server)?
.port(smtp_port)
.credentials(creds)
.build();
tokio::task::spawn_blocking(move || mailer.send(&email))
.await??;
Ok(())
}
async fn send_slack(
&self,
webhook_url: &str,
result: &HealthCheckResult,
) -> Result<()> {
let color = match result.status {
HealthStatus::Healthy => "good",
HealthStatus::Unhealthy => "danger",
HealthStatus::Unknown => "warning",
};
let status_emoji = match result.status {
HealthStatus::Healthy => "✅",
HealthStatus::Unhealthy => "🔴",
HealthStatus::Unknown => "⚠️",
};
let payload = json!({
"attachments": [{
"color": color,
"title": format!("{} Service: {}", status_emoji, result.service_name),
"fields": [
{
"title": "Status",
"value": format!("{:?}", result.status),
"short": true
},
{
"title": "Response Time",
"value": format!("{}ms", result.response_time_ms),
"short": true
},
{
"title": "Message",
"value": &result.message,
"short": false
},
{
"title": "Timestamp",
"value": result.timestamp.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
"short": false
}
],
"footer": "Service Health Monitor",
"ts": result.timestamp.timestamp()
}]
});
self.client
.post(webhook_url)
.json(&payload)
.send()
.await?;
Ok(())
}
async fn send_webhook(
&self,
url: &str,
headers: &[(String, String)],
result: &HealthCheckResult,
) -> Result<()> {
let mut request = self.client.post(url);
for (key, value) in headers {
request = request.header(key, value);
}
let payload = json!({
"service_name": result.service_name,
"status": result.status,
"message": result.message,
"response_time_ms": result.response_time_ms,
"timestamp": result.timestamp.to_rfc3339()
});
request.json(&payload).send().await?;
Ok(())
}
}
src/database.rs
use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use rusqlite::{params, Connection};
use std::sync::{Arc, Mutex};
use crate::models::{HealthCheckResult, HealthStatus, ServiceStatus};
pub struct Database {
conn: Arc<Mutex<Connection>>,
}
impl Database {
pub fn new(path: &str) -> Result<Self> {
let conn = Connection::open(path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_name TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL,
response_time_ms INTEGER NOT NULL,
timestamp TEXT NOT NULL
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_service_timestamp
ON health_checks(service_name, timestamp DESC)",
[],
)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
pub fn insert_check_result(&self, result: &HealthCheckResult) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO health_checks (service_name, status, message, response_time_ms, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
&result.service_name,
format!("{:?}", result.status),
&result.message,
result.response_time_ms,
result.timestamp.to_rfc3339(),
],
)?;
Ok(())
}
pub fn get_service_status(&self, service_name: &str, description: &str) -> Result<ServiceStatus> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT status, message, response_time_ms, timestamp
FROM health_checks
WHERE service_name = ?1
ORDER BY timestamp DESC
LIMIT 1"
)?;
let result = stmt.query_row(params![service_name], |row| {
let status_str: String = row.get(0)?;
let timestamp_str: String = row.get(3)?;
Ok((
match status_str.as_str() {
"Healthy" => HealthStatus::Healthy,
"Unhealthy" => HealthStatus::Unhealthy,
_ => HealthStatus::Unknown,
},
DateTime::parse_from_rfc3339(×tamp_str)
.unwrap()
.with_timezone(&Utc),
))
});
let (current_status, last_check) = result.unwrap_or((HealthStatus::Unknown, Utc::now()));
let last_healthy = self.get_last_healthy_time(service_name)?;
let consecutive_failures = self.get_consecutive_failures(service_name)?;
let uptime_percentage = self.calculate_uptime(service_name, Duration::hours(24))?;
Ok(ServiceStatus {
name: service_name.to_string(),
description: description.to_string(),
current_status,
last_check,
last_healthy,
consecutive_failures,
uptime_percentage,
})
}
fn get_last_healthy_time(&self, service_name: &str) -> Result<Option<DateTime<Utc>>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT timestamp
FROM health_checks
WHERE service_name = ?1 AND status = 'Healthy'
ORDER BY timestamp DESC
LIMIT 1"
)?;
let result = stmt.query_row(params![service_name], |row| {
let timestamp_str: String = row.get(0)?;
Ok(DateTime::parse_from_rfc3339(×tamp_str)
.unwrap()
.with_timezone(&Utc))
});
Ok(result.ok())
}
fn get_consecutive_failures(&self, service_name: &str) -> Result<u32> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT status
FROM health_checks
WHERE service_name = ?1
ORDER BY timestamp DESC"
)?;
let mut rows = stmt.query(params![service_name])?;
let mut count = 0u32;
while let Some(row) = rows.next()? {
let status: String = row.get(0)?;
if status == "Unhealthy" {
count += 1;
} else {
break;
}
}
Ok(count)
}
fn calculate_uptime(&self, service_name: &str, duration: Duration) -> Result<f64> {
let conn = self.conn.lock().unwrap();
let since = Utc::now() - duration;
let mut stmt = conn.prepare(
"SELECT COUNT(*) as total,
SUM(CASE WHEN status = 'Healthy' THEN 1 ELSE 0 END) as healthy
FROM health_checks
WHERE service_name = ?1 AND timestamp >= ?2"
)?;
let (total, healthy): (i64, i64) = stmt.query_row(
params![service_name, since.to_rfc3339()],
|row| Ok((row.get(0)?, row.get(1)?))
)?;
if total == 0 {
Ok(100.0)
} else {
Ok((healthy as f64 / total as f64) * 100.0)
}
}
pub fn get_recent_checks(
&self,
service_name: &str,
limit: usize,
) -> Result<Vec<HealthCheckResult>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT service_name, status, message, response_time_ms, timestamp
FROM health_checks
WHERE service_name = ?1
ORDER BY timestamp DESC
LIMIT ?2"
)?;
let results = stmt
.query_map(params![service_name, limit], |row| {
let status_str: String = row.get(1)?;
let timestamp_str: String = row.get(4)?;
Ok(HealthCheckResult {
service_name: row.get(0)?,
status: match status_str.as_str() {
"Healthy" => HealthStatus::Healthy,
"Unhealthy" => HealthStatus::Unhealthy,
_ => HealthStatus::Unknown,
},
message: row.get(2)?,
response_time_ms: row.get(3)?,
timestamp: DateTime::parse_from_rfc3339(×tamp_str)
.unwrap()
.with_timezone(&Utc),
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(results)
}
pub fn cleanup_old_records(&self, days: i64) -> Result<usize> {
let conn = self.conn.lock().unwrap();
let cutoff = Utc::now() - Duration::days(days);
let deleted = conn.execute(
"DELETE FROM health_checks WHERE timestamp < ?1",
params![cutoff.to_rfc3339()],
)?;
Ok(deleted)
}
}
src/monitor.rs
use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{interval, Duration};
use tracing::{error, info, warn};
use crate::alerter::Alerter;
use crate::checker::HealthChecker;
use crate::database::Database;
use crate::models::{HealthStatus, MonitorConfig, ServiceConfig, ServiceStatus};
pub struct Monitor {
config: MonitorConfig,
checker: HealthChecker,
alerter: Alerter,
database: Arc<Database>,
service_states: Arc<RwLock<HashMap<String, ServiceState>>>,
}
struct ServiceState {
last_status: HealthStatus,
alert_sent: bool,
}
impl Monitor {
pub fn new(config: MonitorConfig) -> Result<Self> {
let database = Arc::new(Database::new(&config.database_path)?);
Ok(Self {
config,
checker: HealthChecker::new(),
alerter: Alerter::new(),
database,
service_states: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting health monitor for {} services", self.config.services.len());
let mut handles = vec![];
for service in &self.config.services {
let service = service.clone();
let checker = self.checker.clone();
let alerter = self.alerter.clone();
let database = Arc::clone(&self.database);
let service_states = Arc::clone(&self.service_states);
let handle = tokio::spawn(async move {
Self::monitor_service(
service,
checker,
alerter,
database,
service_states,
)
.await
});
handles.push(handle);
}
self.start_cleanup_task();
for handle in handles {
if let Err(e) = handle.await {
error!("Service monitoring task failed: {}", e);
}
}
Ok(())
}
async fn monitor_service(
service: ServiceConfig,
checker: HealthChecker,
alerter: Alerter,
database: Arc<Database>,
service_states: Arc<RwLock<HashMap<String, ServiceState>>>,
) {
info!("Starting monitor for service: {}", service.name);
let mut interval = interval(Duration::from_secs(service.check_interval_secs));
loop {
interval.tick().await;
let mut retry_count = 0;
let mut last_result = None;
while retry_count <= service.retry_count {
let result = checker.check(&service.name, &service.check).await;
if result.status == HealthStatus::Healthy {
last_result = Some(result);
break;
}
if retry_count < service.retry_count {
warn!(
"Service {} check failed (attempt {}/{}), retrying...",
service.name,
retry_count + 1,
service.retry_count + 1
);
tokio::time::sleep(Duration::from_secs(service.retry_delay_secs)).await;
}
last_result = Some(result);
retry_count += 1;
}
if let Some(result) = last_result {
if let Err(e) = database.insert_check_result(&result) {
error!("Failed to save check result: {}", e);
}
let mut states = service_states.write().await;
let state = states
.entry(service.name.clone())
.or_insert_with(|| ServiceState {
last_status: HealthStatus::Unknown,
alert_sent: false,
});
let status_changed = state.last_status != result.status;
let should_alert = match result.status {
HealthStatus::Unhealthy => !state.alert_sent,
HealthStatus::Healthy => state.alert_sent,
HealthStatus::Unknown => false,
};
if status_changed || should_alert {
info!(
"Service {} status: {:?} -> {:?}",
service.name, state.last_status, result.status
);
for alert_config in &service.alerts {
if let Err(e) = alerter.send_alert(alert_config, &result).await {
error!("Failed to send alert: {}", e);
} else {
info!("Alert sent for service: {}", service.name);
}
}
state.alert_sent = result.status == HealthStatus::Unhealthy;
}
state.last_status = result.status;
}
}
}
fn start_cleanup_task(&self) {
let database = Arc::clone(&self.database);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(86400)); // 每天執行一次
loop {
interval.tick().await;
match database.cleanup_old_records(30) {
Ok(deleted) => {
info!("Cleaned up {} old records", deleted);
}
Err(e) => {
error!("Failed to cleanup old records: {}", e);
}
}
}
});
}
pub async fn get_all_service_status(&self) -> Result<Vec<ServiceStatus>> {
let mut statuses = Vec::new();
for service in &self.config.services {
let status = self.database.get_service_status(&service.name, &service.description)?;
statuses.push(status);
}
Ok(statuses)
}
pub fn get_database(&self) -> Arc<Database> {
Arc::clone(&self.database)
}
}
impl HealthChecker {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
}
impl Alerter {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
}
src/api.rs
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
routing::get,
Router,
};
use serde_json::{json, Value};
use std::sync::Arc;
use tower_http::cors::CorsLayer;
use crate::database::Database;
use crate::monitor::Monitor;
pub struct ApiState {
pub monitor: Arc<Monitor>,
pub database: Arc<Database>,
}
pub fn create_router(state: Arc<ApiState>) -> Router {
Router::new()
.route("/api/health", get(health_check))
.route("/api/services", get(get_services))
.route("/api/services/:name", get(get_service_detail))
.route("/api/services/:name/history", get(get_service_history))
.layer(CorsLayer::permissive())
.with_state(state)
}
async fn health_check() -> Json<Value> {
Json(json!({
"status": "ok",
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
async fn get_services(
State(state): State<Arc<ApiState>>,
) -> Result<Json<Value>, (StatusCode, String)> {
match state.monitor.get_all_service_status().await {
Ok(services) => Ok(Json(json!({
"services": services,
"timestamp": chrono::Utc::now().to_rfc3339()
}))),
Err(e) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get services: {}", e),
)),
}
}
async fn get_service_detail(
State(state): State<Arc<ApiState>>,
Path(name): Path<String>,
) -> Result<Json<Value>, (StatusCode, String)> {
// 找到對應的服務配置
let service_config = state
.monitor
.config
.services
.iter()
.find(|s| s.name == name)
.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
format!("Service '{}' not found", name),
)
})?;
match state
.database
.get_service_status(&name, &service_config.description)
{
Ok(status) => Ok(Json(json!({
"service": status,
"config": {
"check_interval_secs": service_config.check_interval_secs,
"retry_count": service_config.retry_count,
},
"timestamp": chrono::Utc::now().to_rfc3339()
}))),
Err(e) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get service status: {}", e),
)),
}
}
async fn get_service_history(
State(state): State<Arc<ApiState>>,
Path(name): Path<String>,
) -> Result<Json<Value>, (StatusCode, String)> {
match state.database.get_recent_checks(&name, 100) {
Ok(history) => Ok(Json(json!({
"service_name": name,
"history": history,
"count": history.len(),
"timestamp": chrono::Utc::now().to_rfc3339()
}))),
Err(e) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get service history: {}", e),
)),
}
}
mod alerter;
mod api;
mod checker;
mod database;
mod models;
mod monitor;
use anyhow::Result;
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::api::{create_router, ApiState};
use crate::models::MonitorConfig;
use crate::monitor::Monitor;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let config = load_config()?;
let dashboard_port = config.dashboard_port;
let monitor = Arc::new(Monitor::new(config)?);
let database = monitor.get_database();
let api_state = Arc::new(ApiState {
monitor: Arc::clone(&monitor),
database,
});
let app = create_router(api_state);
let monitor_clone = Arc::clone(&monitor);
tokio::spawn(async move {
if let Err(e) = monitor_clone.start().await {
tracing::error!("Monitor error: {}", e);
}
});
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", dashboard_port))
.await?;
tracing::info!("Dashboard API listening on http://0.0.0.0:{}", dashboard_port);
axum::serve(listener, app).await?;
Ok(())
}
fn load_config() -> Result<MonitorConfig> {
let config_str = std::fs::read_to_string("config.json")?;
let config: MonitorConfig = serde_json::from_str(&config_str)?;
Ok(config)
}
config.json.example
{
"services": [
{
"name": "web-api",
"description": "Main Web API Server",
"check": {
"type": "Http",
"url": "https://api.example.com/health",
"method": "GET",
"expected_status": 200,
"timeout_secs": 10
},
"check_interval_secs": 60,
"retry_count": 3,
"retry_delay_secs": 5,
"alerts": [
{
"type": "Slack",
"webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
},
{
"type": "Email",
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your-email@gmail.com",
"password": "your-app-password",
"from": "monitor@example.com",
"to": ["admin@example.com", "devops@example.com"]
}
]
},
{
"name": "database",
"description": "PostgreSQL Database",
"check": {
"type": "Tcp",
"host": "db.example.com",
"port": 5432,
"timeout_secs": 5
},
"check_interval_secs": 30,
"retry_count": 2,
"retry_delay_secs": 3,
"alerts": [
{
"type": "Webhook",
"url": "https://your-webhook-endpoint.com/alert",
"headers": [
["Authorization", "Bearer YOUR_TOKEN"],
["Content-Type", "application/json"]
]
}
]
},
{
"name": "backup-service",
"description": "Daily Backup Service",
"check": {
"type": "Command",
"command": "systemctl",
"args": ["is-active", "backup.service"],
"timeout_secs": 10
},
"check_interval_secs": 300,
"retry_count": 1,
"retry_delay_secs": 10,
"alerts": [
{
"type": "Slack",
"webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
}
]
},
{
"name": "redis-cache",
"description": "Redis Cache Server",
"check": {
"type": "Tcp",
"host": "localhost",
"port": 6379,
"timeout_secs": 5
},
"check_interval_secs": 60,
"retry_count": 2,
"retry_delay_secs": 5,
"alerts": [
{
"type": "Slack",
"webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
}
]
}
],
"database_path": "health_monitor.db",
"dashboard_port": 8080
}
cargo build --release
準備配置文件
# 複製範例配置
cp config.json.example config.json
# 編輯配置文件
nvim config.json
運行監控
./target/release/service_health_monitor